package com.dizang.cloud.collapser.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;

import org.springframework.stereotype.Component;

import com.dizang.cloud.dto.UserQueryDto;
import com.dizang.cloud.entity.User;

/**
 * 队列和线程池方式合并请求
 * 
 * @author kelvin.cai
 *
 */
@Component
public class UserBatchWithFutureServiceImpl {
    /** 积攒请求的阻塞队列 */
    private LinkedBlockingDeque<UserQueryDto> requestQueue = new LinkedBlockingDeque<>();
    /** 线程池数量 */
    private int threadNum = 1;
    /** 定时间隔时长 */
    private long period = 5000;

    public User getUserById(Long id) throws InterruptedException, ExecutionException {

        UserQueryDto userQueryDto = new UserQueryDto();
        userQueryDto.setId(id);
        CompletableFuture<User> completedFuture = new CompletableFuture<>();
        userQueryDto.setCompletedFuture(completedFuture);

        requestQueue.add(userQueryDto);

        User user = completedFuture.get();
        return user;
    }

    @PostConstruct
    public void init() {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(threadNum);
        // 每5秒执行一次
        scheduledExecutorService.scheduleAtFixedRate(new UserBatchThread(), 0, period, TimeUnit.MILLISECONDS);
    }

    public class UserBatchThread implements Runnable {

        @Override
        public void run() {
            List<UserQueryDto> requestQueueTmp = new ArrayList<>();
            // 存放批量查询的入参
            List<Long> requestId = new ArrayList<>();

            // 把出请求层放入的消息queue的元素取出来
            int size = requestQueue.size();
            for (int i = 0; i < size; i++) {
                UserQueryDto request = requestQueue.poll();
                if (Objects.nonNull(request)) {
                    requestQueueTmp.add(request);
                    requestId.add(request.getId());
                }
            }

            if (!requestId.isEmpty()) {
                try {
                    List<User> response = getUserBatchById(requestId);
                    Map<Long, User> collect = response.stream().collect(
                            Collectors.toMap(detail -> detail.getId(), Function.identity(), (key1, key2) -> key2));
                    // 通知请求的线程
                    for (UserQueryDto request : requestQueueTmp) {
                        request.getCompletedFuture().complete(collect.get(request.getId()));
                    }

                } catch (Exception e) {
                    // 通知请求的线程-异常
                    requestQueueTmp.forEach(request -> request.getCompletedFuture().obtrudeException(e));
                }
            }
        }

    }

    public List<User> getUserBatchById(List<Long> ids) {
        System.out.println("进入批量处理方法" + ids);
        List<User> ps = new ArrayList<>();
        for (Long id : ids) {
            User p = new User();
            p.setId(id);
            p.setUsername("dizang" + id);
            ps.add(p);
        }
        return ps;
    }
}
